package com.offcn.bigdata.spark.p2

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkRDD算子操作之transformation
  *  rdd的大多数XxxByKey的transformation算子都是combineByKeyWithClassTag这个算来实现。
  *   combineByKey正是这个算子的简化版本
  *   那aggregateByKey有何combineByKey是啥关系呢
  *
  *   combineByKey和aggregateByKey的关系就相当于reduceByKey和foldByKey之间的关系
  *
  */
object _02TransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                    .setAppName("TransformationOps")
                    .setMaster("local[*]")
        val sc = new SparkContext(conf)

//        cbk2gbk(sc)
        cbk2rbkOps(sc)
        sc.stop()
    }

    def cbk2rbkOps(sc: SparkContext): Unit = {
        val lines = sc.parallelize(List(
            "hello you",
            "hello me",
            "hello lan lan"
        ))

        val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))
        def createCombiner(num: Int): Int = {
            num
        }
        def mergeValue(sum: Int, num: Int): Int = {
            sum + num
        }
        def mergeCombiners(sum1: Int, sum2: Int): Int = {
            sum1 + sum2
        }
        val ret = pairs.combineByKey(createCombiner, mergeValue, mergeCombiners)

        ret.foreach(println)
    }
    /*
        使用combineByKey去模拟groupBykey

        统计1+...+100
        集中式的做法：
        var sum = 0 也可以等于1
        for(i <- 1 to 100) {
            sum += i
        }
        分布式的做法
        把这100个数分成5分
        第一份：1+...+20
        var sum1 = 0 也可以等于1
            这里让其等于1
        for(i <- 2 to 20) {
            sum1 = sum1 + i
        }
        第二份：21+...+40]
            var sum2 = 0 也可以等于21
            这里让其等于21
            for(i <- 22 to 20) {
                sum2 = sum2 + i
            }
        第三份：41+...+60
            var sum3 = 0 也可以等于41
            这里让其等于41
            for(i <- 42 to 20) {
                sum3 = sum3 + i
            }
        第四份：61+...+80
            var sum4 = 0 也可以等于61
            这里让其等于61
            for(i <- 62 to 20) {
                sum4 = sum4 + i
            }
        第五份：81+...+100
            var sum5 = 0 也可以等于81
            这里让其等于81
            for(i <- 82 to 20) {
                sum5 = sum5 + i
            }
        经过并行计算，就得到了5个sum，分别为sum1, sum2, sum3, sum4, sum5---》sum的集合或者数组
        Array(sum1, sum2, sum3, sum4, sum5)
     */
    def cbk2gbk(sc: SparkContext): Unit = {
        case class Student(id: Int, name: String, province: String)
        var stuRDD = sc.parallelize(List(
            Student(1, "刘博", "天津"),
            Student(4, "何浩", "湖南"),
            Student(10, "王鑫达", "天津"),
            Student(6, "范帅", "湖南"),
            Student(2, "霍龙飞", "山西"),
            Student(3, "付云瑾", "山东"),
            Student(7, "孟阳阳", "山西"),
            Student(5, "龙韬", "湖南"),
            Student(10087, "成思远", "山西"),
            Student(8, "吴延俊", "山东"),
            Student(10086, "刘武", "湖南"),
            Student(10089, "小岚岚", "山东")
        ), 3)
        stuRDD = stuRDD.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"stuRDD中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.toIterator
        })
        println("------------groupByKey----------------------")
        val province2Info: RDD[(String, Student)] = stuRDD.map(stu => (stu.province, stu))
        val province2Infos: RDD[(String, Iterable[Student])] = province2Info.groupByKey()

        province2Infos.foreach{case (province, stus) => {
            println(s"${province}对应的学生有：${stus.toList}")
        }}
        println("--------combineByKey----groupByKey------------")
        /*
        如何理解聚合函数？切入点就是如何理解分布式计算？总--->分--->总
            createCombiner: V => C, 相同的Key在分区中会调用一次该函数，用于创建聚合之后的类型，为了和后续Key相同的数据进行聚合。使用分区中的一条记录进行初始化。
            mergeValue: (C, V) => C, 在相同分区中基于上述createCombiner基础之上的局部聚合
            mergeCombiners: (C, C) => C) 将每个分区中相同key聚合的结果在分区间进行全局聚合
         */
        def createCombiner(stu: Student): Array[Student] = {
            println("--createCombiner---: " + stu)
            Array(stu)
        }

        //分区内的聚合统计
        def mergeValue(stus:Array[Student], stu: Student): Array[Student] = {
            println(s"---mergeValue: stus: ${array2Str(stus)}, stu: ${stu}")
            stus.+:(stu)
        }
        //分区间的聚合
        def mergeCombiners(stus1: Array[Student], stus2: Array[Student]): Array[Student] = {
            println(s"---mergeCombiners: stus1: ${array2Str(stus1)}, stus2: ${array2Str(stus2)}")
            stus1 ++ stus2
        }

        val ret = province2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)

        ret.foreach{case (province, stus) => {
            println(s"${province}对应的学生有：${stus.toList}")
        }}
    }

    def array2Str[T](array: Array[T]): String = {
        array.mkString("[", ", ", "]")
    }
}
